Skip to content

[WIP] Optionally use mmaping of files with spilling #6516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jakirkham
Copy link
Member

This allows the OS to handle mapping pages between file and memory. As a result if memory is getting full and some objects are not being used, the OS can free this memory up behind the scenes. Since the scratch directories being used for spilling are meant to at least be local disks (not NFS) or often solid state, IO between disk and memory can be quite fast. So reloading spilled objects is fairly reasonable.

  • Tests added / passed
  • Passes pre-commit run --all-files

This allows the OS to handle mapping pages between file and memory. As a
result if memory is getting full and some objects are not being used,
the OS can free this memory up behind the scenes. Since the scratch
directories being used for spilling are meant to at least be local disks
(not NFS) or often solid state, IO between disk and memory can be quite
fast. So reloading spilled objects is fairly reasonable.
@jakirkham jakirkham mentioned this pull request Jun 6, 2022
2 tasks

def __init__(self, spill_directory: str, max_weight: int | Literal[False] = False):
def __init__(self, spill_directory: str, max_weight: int | Literal[False] = False, memmap: bool = False):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we want this in the config as well

@github-actions
Copy link
Contributor

github-actions bot commented Jun 7, 2022

Unit Test Results

       15 files  ±  0         15 suites  ±0   6h 34m 25s ⏱️ - 3m 18s
  2 852 tests +11    2 770 ✔️ +16    82 💤 ±0  0  - 5 
21 129 runs  +78  20 184 ✔️ +84  945 💤  - 1  0  - 5 

Results for commit 92ade52. ± Comparison against base commit c014e5b.

♻️ This comment has been updated with latest results.

@fjetter fjetter requested a review from crusaderky June 8, 2022 10:02
@jakirkham
Copy link
Member Author

Should note this is incomplete as we need the config value noted above

super().__init__(
partial(serialize_bytelist, on_error="raise"),
deserialize_bytes,
zict.File(spill_directory),
zict.File(spill_directory, **file_kwargs), # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nitpick: please check for has_zict_210 instead like it already happens above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also, your branch from main is several months old)

@crusaderky
Copy link
Collaborator

It doesn't seem to work:

import dask
import numpy
from distributed.spill import Slow

dask.config.set({"distributed.comm.compression": False})
d = Slow("zictdump", memmap=True)
d["x"] = numpy.random.random(2**27)
x = d["x"]  # RSS memory blows up by 1GB

The memory is allocated by deserialize_bytes.

Assuming the above is fixed, I can see several fundamental problems:

  • It cannot possibly work together with compression. There should be checks that disable compression if mmap is enabled or vice versa. Also, the compression settings should be split between spill and network, so that you can have compression on for network transfers and off for spill. However, after Deserialise data on demand #5900, this would cause uncompressed data to travel over the network even if compression is enabled for network transfers - with potentially very unpleasant performance implications.

  • This causes the fd to survive fh.close(). At any one given time, this means that you'll have additional open file descriptors equal to the number of tasks that are in fast but at some point in the past went through slow. This in turn could cause the process to hit its maximum number of fds. To make things worse, this is irrespective of size - a single-byte variable will cost you 1 fd.

  • SpillBuffer immediately pops the key from Slow upon transition from slow to fast. This in turn deletes the underlying file on disk. On Linux, the occupied space on disk remains there as long as there are open fds to it (see above point).
    This change causes the available disk space to be reduced, without any kind of possible insight (your local directory might as well be empty!), by the size of all tasks that are in fast but at some point in the past went through slow. This will cause the max_spill setting to be violated.

  • It's extremely inefficient if you spill the same key again, as it will be written to disk again even if it's already there.

  • The previous point will happen a lot, becase the decision to spill is determined by distributed.worker.memory.threshold, which uses the uncapped sum(sizeof(v) for v in data.fast.values())) - except that data in fast may or may not be using RSS memory.

  • This will cause bugous readings in the GUI and in everything that uses memory metrics in decision-making (namely: the Active Memory Manager), as it will cause process memory to be substantially lower than managed (non-spilled) memory. e.g. you could now have sum(sizeof(v) for v in data.fast.values())) = 50GB, but only 5GB process, 0 spill, and 5GB managed_in_memory (because managed_in_memory is capped to the process memory). As a consequence you'll see the unmanaged memory nailed to zero, which is very misleading.


In conclusion:

I think it would be very useful to take a step back and analyse what, exactly, we are trying to achieve here, that the current explicit spill system doesn't already do, and come up with a design as a consequence.

@jakirkham
Copy link
Member Author

jakirkham commented Jun 9, 2022

As noted above, I'm not sure this is ready for review. That said, appreciate the feedback

Edit: This came out of the discussion in PR ( #6503 )

@jakirkham
Copy link
Member Author

Am curious why compression wouldn't work. As compressors support the Python Buffer Protocol as does mmap, would expect this to work, but maybe I'm missing something

@crusaderky
Copy link
Collaborator

crusaderky commented Jun 9, 2022

If you run serialize_bytelist on a 100 MiB numpy array, you'll get a tuple of 3 frames:

  • dask-specific header (<1 kiB)
  • pickle output containing the numpy metadata (<1 kiB)
  • PickleBuffer (100 MiB)

The 3 buffers are then written sequentially to a file in your local directory.

When you load it back,

  1. mmap creates a buffer that is not actually consuming memory
  2. deserialize_bytes eagerly loads the first 2 frames (header + pickle data) and passes them to pickle.loads. The third frame, which contains the actual numpy buffer, is wrapped by a PickleBuffer as a view of the mmap'ed data, so it's not loaded into memory.
  3. portions of your third frame will load into memory only when something actually reads that specific area of the numpy array.
  4. the Linux kernel can subsequently decide to spill out mmap'ed pages of your numpy array.

This is the theory - in practice, I showed above that there's a bug somewhere that prevents this.

With compression, the second and third buffer are passed through lz4.compress() before you write them to disk.
When you load them back,

  1. mmap creates a buffer that is not actually consuming memory
  2. deserialize_bytes passes the second and third frame to lz4.decompress(), which eagerly loads them into memory, creates new decompressed frames, and then descopes the input mmap'ed data.
  3. you now have a vanilla numpy array, not backed by mmap.

@jakirkham
Copy link
Member Author

Right decompression would force it into memory.

Thought not work meant using the mmap would run into some other bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants